use super::{const_buf, const_void, mut_buf, mut_void, AioFd, Fd, SocketAddr};
use crate::runtime::Extensions;
use crate::{Error, Result};
use core::mem::{self, MaybeUninit};
use core::time::Duration;
use hierr::set_errno;

pub const SHUT_RD: i32 = libc::SHUT_RD;
pub const SHUT_WR: i32 = libc::SHUT_WR;
pub const SHUT_RDWR: i32 = libc::SHUT_RDWR;

impl Fd {
    /// 对应libc::socket接口, 被设置为非阻塞模式.
    pub fn socket(family: i32, ty: i32, proto: i32) -> Result<Self> {
        let fd =
            unsafe { libc::socket(family, ty | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC, proto) };
        if fd >= 0 {
            Ok(Self::new(fd))
        } else {
            Err(Error::last())
        }
    }

    /// 构建libc::SOCK_STREAM的句柄. 被设置为非阻塞模式.
    pub fn tcp_socket(family: i32) -> Result<Self> {
        Self::socket(
            family,
            libc::SOCK_STREAM | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
            0,
        )
    }

    /// 构建libc::SOCK_STREAM的句柄. 被设置为非阻塞模式.
    pub fn udp_socket(family: i32) -> Result<Self> {
        Self::socket(
            family,
            libc::SOCK_DGRAM | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
            0,
        )
    }

    /// 设置阻塞模式.
    pub fn set_nonblocking(&self, nonblocking: bool) -> Result<()> {
        let flags = unsafe { libc::fcntl(self.fd, libc::F_GETFL) };
        let flags = if nonblocking {
            flags | libc::O_NONBLOCK
        } else {
            flags & !libc::O_NONBLOCK
        };
        let ret = unsafe { libc::fcntl(self.fd, libc::F_SETFL, flags) };
        if ret == 0 {
            return Ok(());
        }
        Err(Error::last())
    }

    /// 对应libc::shutdown功能.
    pub fn shutdown(&self, how: i32) -> Result<()> {
        if unsafe { libc::shutdown(self.fd, how) } == 0 {
            return Ok(());
        }
        Err(Error::last())
    }

    /// 对应libc::bind接口.
    pub fn bind(&self, addr: &SocketAddr) -> Result<()> {
        let (addr, addrlen) = addr.get();
        let ret = unsafe { libc::bind(self.fd, addr, addrlen) };
        if ret != 0 {
            return Err(Error::last());
        }
        Ok(())
    }

    /// 对应libc::listen接口.
    pub fn listen(&self, mut backlog: i32) -> Result<()> {
        if backlog <= 0 {
            backlog = libc::SOMAXCONN;
        }
        let ret = unsafe { libc::listen(self.fd, backlog) };
        if ret != 0 {
            return Err(Error::last());
        }
        Ok(())
    }

    /// 构建了一个tcp监听端口，设置了SO_REUSEADDR为true.
    pub fn tcp_server(addr: &SocketAddr) -> Result<Self> {
        Self::tcp_server_then(addr, |fd| {
            fd.setsockopt_i32(libc::SOL_SOCKET, libc::SO_REUSEADDR, true)
        })
    }

    /// 构建了一个tcp监听端口，用户可以利用setsockopt_i32等接口来设置附加属性.
    pub fn tcp_server_then<F>(addr: &SocketAddr, f: F) -> Result<Self>
    where
        F: FnOnce(&Fd) -> Result<()>,
    {
        let fd = Self::socket(addr.family(), libc::SOCK_STREAM, 0)?;
        f(&fd)?;
        fd.bind(addr)?;
        fd.listen(0)?;
        Ok(fd)
    }

    /// 构建一个tcp客户端，addr表示本地地址，可以不设置，由系统自动分配. 工作于非阻塞模式.
    #[deprecated(note = "should instead use tcp_connect")]
    pub fn tcp_client(family: i32, addr: Option<&SocketAddr>) -> Result<Self> {
        let fd = Self::socket(family, libc::SOCK_STREAM, 0)?;
        if let Some(addr) = addr {
            let (addr, addrlen) = addr.get();
            let ret = unsafe { libc::bind(fd.fd, addr, addrlen) };
            if ret != 0 {
                return Err(Error::last());
            }
        }
        Ok(fd)
    }

    /// 新建一个连接，本地地址可以不指定，由系统自动分配. 工作于非阻塞模式.
    pub fn tcp_connect(local: Option<&SocketAddr>, remote: &SocketAddr) -> Result<TcpClient> {
        Self::tcp_connect_then(local, remote, |_| Ok(()))
    }

    /// 新建一个连接，本地地址可以不指定，由系统自动分配. 工作于非阻塞模式.
    /// 用户可以利用setsockopt_i32等接口设置其附加属性.
    pub fn tcp_connect_then<F>(
        local: Option<&SocketAddr>,
        remote: &SocketAddr,
        f: F,
    ) -> Result<TcpClient>
    where
        F: FnOnce(&Fd) -> Result<()>,
    {
        let fd = Self::socket(remote.family(), libc::SOCK_STREAM, 0)?;
        f(&fd)?;
        if let Some(addr) = local {
            fd.bind(addr)?;
        }
        match fd.try_connect(remote) {
            Ok(_) => Ok(TcpClient::new(fd, true)),
            Err(Error {
                errno: libc::EINPROGRESS,
            }) => Ok(TcpClient::new(fd, false)),
            Err(e @ _) => Err(e),
        }
    }

    /// 对应libc::accept接口. 返回的句柄也是工作于非阻塞模式.
    pub fn try_accept(&self) -> Result<(Fd, SocketAddr)> {
        let mut peer_addr = SocketAddr::uninit();
        let (addr, mut addrlen) = peer_addr.get_uninit_mut();
        loop {
            let ret = unsafe {
                libc::accept4(
                    self.fd(),
                    addr,
                    &mut addrlen,
                    libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
                )
            };
            if ret >= 0 {
                let conn = Fd::new(ret);
                return Ok((conn, peer_addr));
            }
            let err = Error::last();
            match err.errno {
                libc::EINTR => continue,
                _ => return Err(err),
            }
        }
    }

    /// 对应libc::connect. 对错误码做了归一处理，任何情况下，只要连接未完成，错误码对应EINPROGRESS.
    pub fn try_connect(&self, saddr: &SocketAddr) -> Result<()> {
        loop {
            let (addr, addrlen) = saddr.get();
            let ret = unsafe { libc::connect(self.fd, addr, addrlen) };
            if ret == 0 {
                return Ok(());
            }
            match Error::last() {
                Error {
                    errno: libc::EINPROGRESS,
                }
                | Error {
                    errno: libc::EALREADY,
                } => return Err(Error::new(libc::EINPROGRESS)),
                Error {
                    errno: libc::EAGAIN,
                } if saddr.family() == libc::AF_UNIX => return Err(Error::new(libc::EINPROGRESS)),
                Error {
                    errno: libc::EISCONN,
                } => return Ok(()),
                Error { errno: libc::EINTR } => continue,
                e @ _ => return Err(e),
            }
        }
    }

    /// 设置linger.
    pub fn set_linger(&self, time: Option<Duration>) {
        let linger = if let Some(tm) = time {
            libc::linger {
                l_onoff: 1,
                l_linger: tm.as_secs() as i32,
            }
        } else {
            libc::linger {
                l_onoff: 0,
                l_linger: 0,
            }
        };
        unsafe {
            libc::setsockopt(
                self.fd,
                libc::SOL_SOCKET,
                libc::SO_LINGER,
                const_void(&linger),
                mem::size_of_val(&linger) as u32,
            );
        }
    }

    /// 获取linger设置.
    pub fn get_linger(&self) -> Result<Option<Duration>> {
        let mut linger = libc::linger {
            l_onoff: 0,
            l_linger: 0,
        };
        let mut len = mem::size_of_val(&linger) as u32;
        let ret = unsafe {
            libc::getsockopt(
                self.fd,
                libc::SOL_SOCKET,
                libc::SO_LINGER,
                mut_void(&mut linger),
                &mut len,
            )
        };
        if ret == 0 {
            if linger.l_onoff > 0 {
                Ok(Some(Duration::new(linger.l_linger as u64, 0)))
            } else {
                Ok(None)
            }
        } else {
            Err(Error::last())
        }
    }

    /// 获取socket错误码.
    pub fn get_sock_error(&self) -> Result<()> {
        let Ok(val) = self.getsockopt_i32(libc::SOL_SOCKET, libc::SO_ERROR) else {
            return Err(Error::last());
        };
        if val == 0 {
            return Ok(());
        }
        set_errno(val);
        Err(Error::new(val))
    }

    /// 对应libc::setsockopt，仅支持值类型为i32的选项.
    pub fn setsockopt_i32<T: Into<i32>>(&self, level: i32, opt: i32, value: T) -> Result<()> {
        let value: i32 = value.into();
        let ret = unsafe {
            libc::setsockopt(
                self.fd,
                level,
                opt,
                const_void(&value),
                mem::size_of::<i32>() as u32,
            )
        };
        if ret == 0 {
            Ok(())
        } else {
            Err(Error::last())
        }
    }

    /// 对应libc::getsockopt，仅支持值类型为i32的选项.
    pub fn getsockopt_i32<T: From<i32>>(&self, level: i32, opt: i32) -> Result<T> {
        let mut val: i32 = 0;
        let mut len = mem::size_of::<i32>() as u32;
        let ret = unsafe { libc::getsockopt(self.fd, level, opt, mut_void(&mut val), &mut len) };

        if ret == 0 {
            Ok(val.into())
        } else {
            Err(Error::last())
        }
    }

    /// 获取本地地址.
    pub fn getsockname(&self) -> Result<SocketAddr> {
        let mut sockaddr = SocketAddr::uninit();
        let (addr, mut addrlen) = sockaddr.get_uninit_mut();
        let ret = unsafe { libc::getsockname(self.fd, addr, &mut addrlen) };
        if ret == 0 {
            return Ok(sockaddr);
        }
        Err(Error::last())
    }

    /// 获取对端地址.
    pub fn getpeername(&self) -> Result<SocketAddr> {
        let mut sockaddr = SocketAddr::uninit();
        let (addr, mut addrlen) = sockaddr.get_uninit_mut();
        let ret = unsafe { libc::getpeername(self.fd, addr, &mut addrlen) };
        if ret == 0 {
            return Ok(sockaddr);
        }
        Err(Error::last())
    }

    /// 对应libc::sendto
    pub fn try_sendto(&self, buf: &[u8], flags: i32, dst: &SocketAddr) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let (addr, addrlen) = dst.get();

        self.try_write_do(|| unsafe {
            libc::sendto(self.fd, const_buf(buf), buf.len(), flags, addr, addrlen)
        })
    }

    /// 对应libc::sendmsg, 指定了对端的地址.
    /// off表示发送数据在buf中的起始偏移
    pub fn try_sendmsgto(
        &self,
        buf: &[&[u8]],
        off: usize,
        flags: i32,
        dst: &SocketAddr,
    ) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        let (addr, addrlen) = dst.get();
        msg.msg_name = (addr as *const libc::sockaddr)
            .cast_mut()
            .cast::<libc::c_void>();
        msg.msg_namelen = addrlen;

        self.try_writev_do(buf, off, |piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::sendmsg(self.fd, &msg, flags) }
        })
    }

    /// 对应libc::recvfrom.
    pub fn try_recvfrom(&self, buf: &mut [u8], flags: i32) -> Result<(usize, SocketAddr)> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut sockaddr = SocketAddr::uninit();
        let (addr, mut addrlen) = sockaddr.get_uninit_mut();

        self.try_read_do(|| unsafe {
            libc::recvfrom(self.fd, mut_buf(buf), buf.len(), flags, addr, &mut addrlen)
        })
        .map(|size| (size, sockaddr))
    }

    /// 对应libc::recvmsg.
    /// off表示写入数据在buf中的起始偏移
    pub fn try_recvmsgfrom(
        &self,
        buf: &[&mut [u8]],
        off: usize,
        flags: i32,
    ) -> Result<(usize, SocketAddr)> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        let mut sockaddr = SocketAddr::uninit();
        let (addr, addrlen) = sockaddr.get_uninit_mut();
        msg.msg_name = addr.cast::<libc::c_void>();
        msg.msg_namelen = addrlen;

        self.try_readv_do(buf, off, |piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::recvmsg(self.fd, &mut msg, flags) }
        })
        .map(|size| (size, sockaddr))
    }

    pub fn try_recv(&self, buf: &mut [u8], flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        self.try_read_do(|| unsafe { libc::recv(self.fd, mut_buf(buf), buf.len(), flags) })
    }

    /// 对应libc::recvmsg.
    /// off表示写入数据在buf中的起始偏移
    pub fn try_recvmsg(&self, buf: &[&mut [u8]], off: usize, flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        self.try_readv_do(buf, off, |piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::recvmsg(self.fd, &mut msg, flags) }
        })
    }

    /// 对应libc::send
    pub fn try_send(&self, buf: &[u8], flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        self.try_write_do(|| unsafe { libc::send(self.fd, const_buf(buf), buf.len(), flags) })
    }

    /// 对应libc::sendmsg
    /// off表示发送数据在buf中的起始偏移
    pub fn try_sendmsg(&self, buf: &[&[u8]], off: usize, flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        self.try_writev_do(buf, off, |piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::sendmsg(self.fd, &msg, flags) }
        })
    }
}

impl AioFd<'_> {
    /// 对应libc::accept的异步接口.
    pub async fn accept(&mut self) -> Result<(Fd, SocketAddr)> {
        loop {
            match self.try_accept() {
                Ok((fd, addr)) => return Ok((fd, addr)),
                Err(Error {
                    errno: libc::EAGAIN,
                }) => self.wait_readable().await?,
                Err(e @ _) => return Err(e),
            }
        }
    }

    /// 对应libc::connect的异步接口.
    pub async fn connect(&mut self, addr: &SocketAddr) -> Result<()> {
        match self.try_connect(addr) {
            Err(Error {
                errno: libc::EINPROGRESS,
            }) => {}
            Ok(_) => return Ok(()),
            Err(e @ _) => return Err(e),
        }
        self.wait_writable().await?;
        self.get_sock_error()
    }

    /// 对应libc::recv的异步接口.
    /// 接收至少一个字节的数据才返回, 除非对端断链.
    /// 有可能接收多次填充缓冲区，如果是DGRAM类型，请使用try_recv
    pub async fn recv(&mut self, buf: &mut [u8], flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        self.do_read(buf, false, |fd, buf| unsafe {
            libc::recv(fd, mut_buf(buf), buf.len(), flags)
        })
        .await
    }

    /// 对应libc::recvfrom的异步接口.
    /// 接收至少一个字节的数据才返回, 除非对端断链.
    /// 有可能接收多次填充缓冲区，如果是DGRAM类型，请使用try_recvfrom
    pub async fn recvfrom(&mut self, buf: &mut [u8], flags: i32) -> Result<(usize, SocketAddr)> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut sockaddr = SocketAddr::uninit();
        let (addr, mut addrlen) = sockaddr.get_uninit_mut();
        self.do_read(buf, false, |fd, buf| unsafe {
            libc::recvfrom(fd, mut_buf(buf), buf.len(), flags, addr, &mut addrlen)
        })
        .await
        .map(|size| (size, sockaddr))
    }

    /// 对应libc::recvmsg的异步接口.
    /// 接收至少一个字节的数据才返回, 除非对端断链.
    /// 有可能接收多次填充缓冲区，如果是DGRAM类型，请使用try_recvmsg
    pub async fn recvmsg(&mut self, buf: &[&mut [u8]], off: usize, flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        self.do_readv(buf, off, false, |fd, piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::recvmsg(fd, &mut msg, flags) }
        })
        .await
    }

    /// 对应libc::recvmsg的异步接口.
    /// 接收至少一个字节的数据才返回, 除非对端断链.
    /// 有可能接收多次填充缓冲区，如果是DGRAM类型，请使用try_recvmsgfrom
    pub async fn recvmsgfrom(
        &mut self,
        buf: &[&mut [u8]],
        off: usize,
        flags: i32,
    ) -> Result<(usize, SocketAddr)> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        let mut sockaddr = SocketAddr::uninit();
        let (addr, addrlen) = sockaddr.get_uninit_mut();
        msg.msg_name = addr.cast::<libc::c_void>();
        msg.msg_namelen = addrlen;
        self.do_readv(buf, off, false, |fd, piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::recvmsg(fd, &mut msg, flags) }
        })
        .await
        .map(|size| (size, sockaddr))
    }

    /// 对应libc::recv异步接口.
    /// 接收指定长度的数据后才返回, 除非对端断链.
    /// 有可能接收多次填充缓冲区，如果是DGRAM类型，请使用try_recv
    pub async fn recv_all(&mut self, buf: &mut [u8], flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        self.do_read(buf, true, |fd, buf| unsafe {
            libc::recv(fd, mut_buf(buf), buf.len(), flags)
        })
        .await
    }

    /// 对应libc::recvfrom异步接口.
    /// 接收指定长度的数据后才返回, 除非对端断链.
    /// 有可能接收多次填充缓冲区，如果是DGRAM类型，请使用try_recvfrom
    pub async fn recvfrom_all(
        &mut self,
        buf: &mut [u8],
        flags: i32,
    ) -> Result<(usize, SocketAddr)> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut sockaddr = SocketAddr::uninit();
        let (addr, mut addrlen) = sockaddr.get_uninit_mut();
        self.do_read(buf, true, |fd, buf| unsafe {
            libc::recvfrom(fd, mut_buf(buf), buf.len(), flags, addr, &mut addrlen)
        })
        .await
        .map(|size| (size, sockaddr))
    }

    /// 对应libc::recvfrom异步接口.
    /// 接收指定长度的数据后才返回, 除非对端断链.
    /// 有可能接收多次填充缓冲区，如果是DGRAM类型，请使用try_recvmsg
    pub async fn recvmsg_all(
        &mut self,
        buf: &[&mut [u8]],
        off: usize,
        flags: i32,
    ) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        self.do_readv(buf, off, true, |fd, piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::recvmsg(fd, &mut msg, flags) }
        })
        .await
    }

    /// 对应libc::recvmsg异步接口.
    /// 接收至少一个字节的数据才返回, 除非对端断链.
    /// 有可能接收多次填充缓冲区，如果是DGRAM类型，请使用try_recvmsgfrom
    pub async fn recvmsgfrom_all(
        &mut self,
        buf: &[&mut [u8]],
        off: usize,
        flags: i32,
    ) -> Result<(usize, SocketAddr)> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        let mut sockaddr = SocketAddr::uninit();
        let (addr, addrlen) = sockaddr.get_uninit_mut();
        msg.msg_name = addr.cast::<libc::c_void>();
        msg.msg_namelen = addrlen;
        self.do_readv(buf, off, true, |fd, piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::recvmsg(fd, &mut msg, flags) }
        })
        .await
        .map(|size| (size, sockaddr))
    }

    /// 对应libc::send异步接口.
    /// 发送至少一个字节的数据才返回.
    /// 有可能数据被分为多次发送，因此如果是dgram类型，建议使用try_send
    pub async fn send(&mut self, buf: &[u8], flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        self.do_write(buf, false, |fd, buf| unsafe {
            libc::send(fd, const_buf(buf), buf.len(), flags)
        })
        .await
    }

    /// 对应libc::sendto异步接口.
    /// 发送至少一个字节的数据才返回.
    /// 有可能数据被分为多次发送，因此如果是dgram类型，建议使用try_sendto
    pub async fn sendto(&mut self, buf: &[u8], flags: i32, dst: &SocketAddr) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let (addr, addrlen) = dst.get();
        self.do_write(buf, false, |fd, buf| unsafe {
            libc::sendto(fd, const_buf(buf), buf.len(), flags, addr, addrlen)
        })
        .await
    }

    /// 对应libc::sendmsg异步接口.
    /// 发送至少一个字节的数据才返回.
    /// 有可能数据被分为多次发送，因此如果是dgram类型，建议使用try_sendmsg
    pub async fn sendmsg(&mut self, buf: &[&[u8]], off: usize, flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        self.do_writev(buf, off, false, |fd, piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::sendmsg(fd, &msg, flags) }
        })
        .await
    }

    /// 对应libc::sendmsg异步接口.
    /// 发送至少一个字节的数据才返回.
    /// 有可能数据被分为多次发送，因此如果是dgram类型，建议使用try_sendmsgto
    pub async fn sendmsgto(
        &mut self,
        buf: &[&[u8]],
        off: usize,
        flags: i32,
        dst: &SocketAddr,
    ) -> Result<usize> {
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        let (addr, addrlen) = dst.get();
        msg.msg_name = (addr as *const libc::sockaddr)
            .cast_mut()
            .cast::<libc::c_void>();
        msg.msg_namelen = addrlen;
        self.do_writev(buf, off, false, |fd, piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::sendmsg(fd, &msg, flags) }
        })
        .await
    }

    /// 对应libc::send异步接口.
    /// 有可能数据被分为多次发送，因此如果是dgram类型，建议使用try_send
    pub async fn send_all(&mut self, buf: &[u8], flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        self.do_write(buf, true, |fd, buf| unsafe {
            libc::send(fd, const_buf(buf), buf.len(), flags)
        })
        .await
    }

    /// 对应libc::sendto异步接口.
    /// 有可能数据被分为多次发送，因此如果是dgram类型，建议使用try_sendto
    pub async fn sendto_all(&mut self, buf: &[u8], flags: i32, dst: &SocketAddr) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let (addr, addrlen) = dst.get();
        self.do_write(buf, true, |fd, buf| unsafe {
            libc::sendto(fd, const_buf(buf), buf.len(), flags, addr, addrlen)
        })
        .await
    }

    /// 对应libc::sendmsg异步接口.
    /// 有可能数据被分为多次发送，因此如果是dgram类型，建议使用try_sendmsg
    pub async fn sendmsg_all(&mut self, buf: &[&[u8]], off: usize, flags: i32) -> Result<usize> {
        let flags = flags | libc::MSG_DONTWAIT;
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        self.do_writev(buf, off, true, |fd, piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::sendmsg(fd, &msg, flags) }
        })
        .await
    }

    /// 对应libc::sendmsg异步接口.
    /// 有可能数据被分为多次发送，因此如果是dgram类型，建议使用try_sendmsgto
    pub async fn sendmsgto_all(
        &mut self,
        buf: &[&[u8]],
        off: usize,
        flags: i32,
        dst: &SocketAddr,
    ) -> Result<usize> {
        let mut msg = unsafe { MaybeUninit::<libc::msghdr>::zeroed().assume_init_read() };
        let (addr, addrlen) = dst.get();
        msg.msg_name = (addr as *const libc::sockaddr)
            .cast_mut()
            .cast::<libc::c_void>();
        msg.msg_namelen = addrlen;
        self.do_writev(buf, off, true, |fd, piov, iovcnt| {
            msg.msg_iov = piov.cast_mut();
            msg.msg_iovlen = iovcnt as usize;
            unsafe { libc::sendmsg(fd, &msg, flags) }
        })
        .await
    }
}

impl Fd {
    /// linger: 用户态实现linger机制
    /// 支持连接单通状态下转发数据
    /// 返回转发成功的数据字节数
    /// 调用者决定是否启用, 如何启用wait_entry
    pub async fn copy_bidirectional<'b>(
        &self,
        other: &Self,
        buf: &mut [u8],
        linger: Duration,
    ) -> (u64, u64) {
        let mut aio_1 = AioFd::new(self);
        let mut aio_2 = AioFd::new(other);
        aio_1.copy_bidirectional(&mut aio_2, buf, linger).await
    }
}

impl<'a> AioFd<'a> {
    /// linger: 用户态实现linger机制
    /// 支持连接单通状态下转发数据
    /// 返回转发成功的数据字节数
    /// 调用者决定是否启用, 如何启用wait_entry
    pub async fn copy_bidirectional<'b>(
        &'a mut self,
        other: &'b mut AioFd<'b>,
        buf: &mut [u8],
        linger: Duration,
    ) -> (u64, u64) {
        self.set_linger(Some(Duration::new(0, 0)));
        other.set_linger(Some(Duration::new(9, 0)));
        let mut left = ProxyFd::new(self);
        let mut right = ProxyFd::new(other);
        let mut bytes_left = 0;
        let mut bytes_right = 0;
        let (mut rd_left, mut rd_right) = (true, true);

        while !left.closed || !right.closed {
            if rd_left {
                bytes_left += left.copy_to(&mut right, buf, linger).await;
            }
            if rd_right {
                bytes_right += right.copy_to(&mut left, buf, linger).await;
            }
            (rd_left, rd_right) = left.wait_read_or(&mut right).await;
        }
        (bytes_left, bytes_right)
    }
}

struct ProxyFd<'a> {
    aio: &'a mut AioFd<'a>,
    closed: bool,
}

impl<'a> ProxyFd<'a> {
    fn new(aio: &'a mut AioFd<'a>) -> Self {
        Self { aio, closed: false }
    }

    async fn wait_read_or(&mut self, other: &mut ProxyFd<'_>) -> (bool, bool) {
        if !self.closed && !other.closed {
            let mut rd_left = false;
            let mut rd_right = false;
            let _ = self
                .aio
                .wait_readable()
                .ready(|_| rd_left = true)
                .or(other.aio.wait_readable().ready(|_| rd_right = true))
                .await;
            (rd_left, rd_right)
        } else if !self.closed {
            let _ = self.aio.wait_readable().await;
            (true, false)
        } else if !other.closed {
            let _ = other.aio.wait_readable().await;
            (false, true)
        } else {
            (false, false)
        }
    }

    async fn copy_to(&mut self, other: &mut ProxyFd<'_>, buf: &mut [u8], linger: Duration) -> u64 {
        let mut bytes = 0_u64;
        loop {
            match self.aio.try_recv(buf, 0) {
                Ok(len @ 1..) => match other.aio.send_all(&buf[..len], 0).await {
                    Ok(size) => bytes += size as u64,
                    Err(_) => {
                        other.closed = true;
                        self.close(buf).deadline(linger).await;
                        break;
                    }
                },
                Ok(0) => {
                    let _ = other.aio.shutdown(SHUT_WR);
                    let _ = self.aio.shutdown(SHUT_RD);
                    self.closed = true;
                    break;
                }
                Err(Error {
                    errno: hierr::EAGAIN,
                }) => break,
                Err(_) => {
                    let _ = other.aio.shutdown(SHUT_WR);
                    self.closed = true;
                    other.close(buf).deadline(linger).await;
                    break;
                }
            }
        }
        bytes
    }

    async fn close(&mut self, buf: &mut [u8]) {
        self.closed = true;
        loop {
            match self.aio.try_recv(buf, 0) {
                Ok(0) => return,
                Ok(_) => continue,
                Err(Error {
                    errno: hierr::EAGAIN,
                }) => {
                    let _ = self.aio.wait_readable().await;
                }
                Err(_) => return,
            }
        }
    }
}

pub struct TcpClient {
    fd: Fd,
    connected: bool,
}

impl TcpClient {
    fn new(fd: Fd, connected: bool) -> Self {
        Self { fd, connected }
    }

    /// 连接建立成功则转换为Fd，同时返回cookie值可用于AioFd:new_with，加速后续的异步io事件的注册.
    pub async fn wait_connected(self) -> Result<(Fd, usize)> {
        if !self.connected {
            let mut aio = AioFd::new(&self.fd);
            aio.wait_writable().await?;
            aio.get_sock_error()?;
            let cookie = aio.wait_cookie();
            core::mem::drop(aio);
            return Ok((self.fd, cookie));
        }
        Ok((self.fd, 0))
    }
}
